package collectors

import (
	"context"
	"encoding/json"
	"github.com/Shopify/sarama"
	"github.com/go-kit/kit/log"
)

type CollectorReporterRepository interface {
	ReportTextMessage(ctx context.Context, report MessageReport) error
	ReportOtherMessage(ctx context.Context, report MessageReport) error
	ReportCollectorJoinChatroom(ctx context.Context, report CollectorJoinChatroomReport) error
	ReportCollectorLeaveChatroom(ctx context.Context, report CollectorLeaveChatroomReport) error
	ReportMemberJoinChatroom(ctx context.Context, report MemberJoinChatroomReport) error
	ReportMemberLeaveChatroom(ctx context.Context, report MemberLeaveChatroomReport) error
	ReportChatroomMember(ctx context.Context, report ChatroomMemberReport) error
	ReportLog(ctx context.Context, report LogReport) error
	ReportHeartbeat(ctx context.Context, report HeartBeatReport) error
}

type kafkaCollectorReporterRepository struct {
	logger                      log.Logger
	producer                    sarama.AsyncProducer
	textMessageTopic            string
	otherMessageTopic           string
	collectorJoinChatroomTopic  string
	collectorLeaveChatroomTopic string
	memberJoinChatroomTopic     string
	memberLeaveChatroomTopic    string
	chatroomMemberTopic         string
	logTopic                    string
	heartBeatTopic              string
}

type NewKafkaCollectorReporterRepositoryOpts struct {
	TextMessageTopic            string
	OtherMessageTopic           string
	CollectorJoinChatroomTopic  string
	CollectorLeaveChatroomTopic string
	MemberJoinChatroomTopic     string
	MemberLeaveChatroomTopic    string
	ChatroomMemberTopic         string
	LogTopic                    string
	HeartBeatTopic              string
}

func NewKafkaCollectorReporterRepository(logger log.Logger, producer sarama.AsyncProducer, opts NewKafkaCollectorReporterRepositoryOpts) CollectorReporterRepository {
	return &kafkaCollectorReporterRepository{
		logger:                      logger,
		producer:                    producer,
		textMessageTopic:            opts.TextMessageTopic,
		otherMessageTopic:           opts.OtherMessageTopic,
		collectorJoinChatroomTopic:  opts.CollectorJoinChatroomTopic,
		collectorLeaveChatroomTopic: opts.CollectorLeaveChatroomTopic,
		memberJoinChatroomTopic:     opts.MemberJoinChatroomTopic,
		memberLeaveChatroomTopic:    opts.MemberLeaveChatroomTopic,
		chatroomMemberTopic:         opts.ChatroomMemberTopic,
		logTopic:                    opts.LogTopic,
		heartBeatTopic:              opts.HeartBeatTopic,
	}
}

func (r *kafkaCollectorReporterRepository) send(_ context.Context, topic string, value []byte) error {
	_ = r.logger.Log("topic", topic, "value", string(value))
	r.producer.Input() <- &sarama.ProducerMessage{
		Topic: topic,
		Value: sarama.ByteEncoder(value),
	}
	return nil
}

func (r *kafkaCollectorReporterRepository) ReportTextMessage(ctx context.Context, report MessageReport) error {
	b, err := json.Marshal(report)
	if err != nil {
		return err
	}
	return r.send(ctx, r.textMessageTopic, b)
}

func (r *kafkaCollectorReporterRepository) ReportOtherMessage(ctx context.Context, report MessageReport) error {
	b, err := json.Marshal(report)
	if err != nil {
		return err
	}
	return r.send(ctx, r.otherMessageTopic, b)
}

func (r *kafkaCollectorReporterRepository) ReportCollectorJoinChatroom(ctx context.Context, report CollectorJoinChatroomReport) error {
	b, err := json.Marshal(report)
	if err != nil {
		return err
	}
	return r.send(ctx, r.collectorJoinChatroomTopic, b)
}

func (r *kafkaCollectorReporterRepository) ReportCollectorLeaveChatroom(ctx context.Context, report CollectorLeaveChatroomReport) error {
	b, err := json.Marshal(report)
	if err != nil {
		return err
	}
	return r.send(ctx, r.collectorLeaveChatroomTopic, b)
}

func (r *kafkaCollectorReporterRepository) ReportMemberJoinChatroom(ctx context.Context, report MemberJoinChatroomReport) error {
	b, err := json.Marshal(report)
	if err != nil {
		return err
	}
	return r.send(ctx, r.memberJoinChatroomTopic, b)
}

func (r *kafkaCollectorReporterRepository) ReportMemberLeaveChatroom(ctx context.Context, report MemberLeaveChatroomReport) error {
	b, err := json.Marshal(report)
	if err != nil {
		return err
	}
	return r.send(ctx, r.memberLeaveChatroomTopic, b)
}

func (r *kafkaCollectorReporterRepository) ReportChatroomMember(ctx context.Context, report ChatroomMemberReport) error {
	b, err := json.Marshal(report)
	if err != nil {
		return err
	}
	return r.send(ctx, r.chatroomMemberTopic, b)
}

func (r *kafkaCollectorReporterRepository) ReportLog(ctx context.Context, report LogReport) error {
	b, err := json.Marshal(report)
	if err != nil {
		return err
	}
	return r.send(ctx, r.logTopic, b)
}

func (r *kafkaCollectorReporterRepository) ReportHeartbeat(ctx context.Context, report HeartBeatReport) error {
	b, err := json.Marshal(report)
	if err != nil {
		return err
	}
	return r.send(ctx, r.heartBeatTopic, b)
}
